{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "binding-delta",
   "metadata": {
    "papermill": {
     "duration": 0.016304,
     "end_time": "2021-03-22T20:29:23.476444",
     "exception": false,
     "start_time": "2021-03-22T20:29:23.460140",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "# ibm_sql_query"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bb97e294-9399-4d96-a95c-8ad7e29a2872",
   "metadata": {},
   "source": [
    "Run arbitrary SQL based data transformation jobs on CSV, PARQUET, JSON, AVRO and ORC data stored on Cloud Object Storage using IBM SQL Query. Transformation results are written stored back to Cloud Object Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b94a9eec-4230-4287-b4b0-f1d6b5237c54",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "import os\n",
    "create_image = bool(os.environ.get('create_image',False))\n",
    "if (create_image):\n",
    "    docker_file=\"\"\"\n",
    "    FROM registry.access.redhat.com/ubi8/python-39\n",
    "    RUN pip install ipython nbformat numpy ibm-cos-sdk-core ibm-cos-sdk ibm-watson-machine-learning ibm-watson-studio-pipelines ibmcloudsql pyyaml\n",
    "    ADD ibm-sql-query-cpd.ipynb .\n",
    "    \"\"\"\n",
    "    with open(\"Dockerfile\", \"w\") as text_file:\n",
    "        text_file.write(docker_file)\n",
    "\n",
    "    !docker build -t ibm_sql_query_cpd .\n",
    "    exit()\n",
    "else:\n",
    "    !pip install nbformat numpy ibm-cos-sdk-core ibm-cos-sdk ibm-watson-machine-learning ibm-watson-studio-pipelines ibmcloudsql pyyaml\n",
    "    None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "beginning-wisdom",
   "metadata": {
    "papermill": {
     "duration": 0.164002,
     "end_time": "2021-03-22T20:29:25.951504",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.787502",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "import glob\n",
    "import logging\n",
    "import ibmcloudsql\n",
    "from ibmcloudsql import SQLQuery\n",
    "import os\n",
    "import shutil\n",
    "import sys\n",
    "import re\n",
    "from ibm_watson_machine_learning import APIClient\n",
    "from ibm_watson_studio_pipelines import WSPipelines\n",
    "from ibm_watson_studio_pipelines.cpd_paths import CpdScope, CpdPath\n",
    "import yaml"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abstract-cambridge",
   "metadata": {
    "papermill": {
     "duration": 0.012801,
     "end_time": "2021-03-22T20:29:25.972462",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.959661",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# IBM Cloud API key\n",
    "api_key = os.environ.get('api_key')\n",
    "\n",
    "# COS URL where the results of the SQL job are to be stored\n",
    "target_dir_path = os.environ.get('target_dir_path')\n",
    "\n",
    "# Asset name to register for the results written by the SQL job\n",
    "target_asset_name = os.environ.get('target_asset_name')\n",
    "\n",
    "# sql statement to execute\n",
    "sql = os.environ.get('sql')\n",
    "\n",
    "# (unique) Custom Resource Name (CRN) of IBM SQL Query Service\n",
    "sql_query_crn = os.environ.get('sql_query_crn')\n",
    "\n",
    "# default: CSV - (will be generated into according STORED AS … clause in the INTO clause)\n",
    "format = os.environ.get('format' , 'CSV')\n",
    "\n",
    "# optional, list of columns to use for partitioning the results of the SQL job, will be generated into according PARTITIONED BY (<columns>) clause in the INTO clause)\n",
    "partition_columns = os.environ.get('partition_columns')\n",
    "\n",
    "# optional, number of objects to store the results of the SQL job in, will be generated into according PARTITIONED INTO <num> OBJECTS clause in INTO clause\n",
    "number_of_objects = int(os.environ.get('number_of_objects', 0))\n",
    "\n",
    "# optional, number of rows to be stored in each result object of the SQL job, will be generated into according PARTITIONED EVERY <num> ROWS clause in INTO clause\n",
    "rows_per_object = int(os.environ.get('rows_per_object', 0))\n",
    "\n",
    "# default: False, only valid when none of the above partitioning option is specified, produces exactly one object with name specified in target_dir_path, twill be generated into sqlClient.rename_exact_result(jobid) after SQL has run.\n",
    "exact_name = bool(os.environ.get('exact_name', False))\n",
    "\n",
    "# default: False - will be generated into JOBPREFIX NONE in the INTO clause. Will cause results of previous runs with same output_uri to be overwritten, because no unique sub folder will be created for the result)\n",
    "no_jobid_folder = bool(os.environ.get('no_jobid_folder', False))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b6c2ee10-a8b5-48c9-abb1-2943287b680f",
   "metadata": {},
   "outputs": [],
   "source": [
    "\"\"\"\n",
    "api_key = '_sVLwgGniPFvX4YFr-VqQyZgzoW_inmuxbVjHff-Ec'\n",
    "#token = 'eyJraWQiOiIyMDIyMDMxNzA4MjMiLCJhbGciOiJSUzI1NiJ9.eyJpYW1faWQiOiJJQk1pZC0yNzAwMDI1NzNZIiwiaWQiOiJJQk1pZC0yNzAwMDI1NzNZIiwicmVhbG1pZCI6IklCTWlkIiwic2Vzc2lvbl9pZCI6IkMtZjI4Mjg0ODEtNGFiZi00MmZmLWJkZTMtYWUxMDE2MzFjNzFlIiwic2Vzc2lvbl9leHBfbWF4IjoxNjQ3Njg0Mzc0LCJzZXNzaW9uX2V4cF9uZXh0IjoxNjQ3NjA1MTg5LCJqdGkiOiIzMDhhYThkZC03Mjk5LTQ3MmYtYTI1YS1lYzExMzdkZjU4ZjMiLCJpZGVudGlmaWVyIjoiMjcwMDAyNTczWSIsImdpdmVuX25hbWUiOiJSb21lbyIsImZhbWlseV9uYW1lIjoiS2llbnpsZXIiLCJuYW1lIjoiUm9tZW8gS2llbnpsZXIiLCJlbWFpbCI6InJvbWVvLmtpZW56bGVyQGNoLmlibS5jb20iLCJzdWIiOiJyb21lby5raWVuemxlckBjaC5pYm0uY29tIiwiYXV0aG4iOnsic3ViIjoicm9tZW8ua2llbnpsZXJAY2guaWJtLmNvbSIsImlhbV9pZCI6IklCTWlkLTI3MDAwMjU3M1kiLCJuYW1lIjoiUm9tZW8gS2llbnpsZXIiLCJnaXZlbl9uYW1lIjoiUm9tZW8iLCJmYW1pbHlfbmFtZSI6IktpZW56bGVyIiwiZW1haWwiOiJyb21lby5raWVuemxlckBjaC5pYm0uY29tIn0sImFjY291bnQiOnsiYm91bmRhcnkiOiJnbG9iYWwiLCJ2YWxpZCI6dHJ1ZSwiYnNzIjoiOWIxM2I4NTdhMzIzNDFiNzE2NzI1NWRlNzE3MTcyZjUiLCJpbXNfdXNlcl9pZCI6IjgwODAyMjIiLCJpbXMiOiIyMDMyNDc0In0sImlhdCI6MTY0NzU5Nzk4OSwiZXhwIjoxNjQ3NTk5MTg5LCJpc3MiOiJodHRwczovL2lhbS5jbG91ZC5pYm0uY29tL2lkZW50aXR5IiwiZ3JhbnRfdHlwZSI6InVybjppYm06cGFyYW1zOm9hdXRoOmdyYW50LXR5cGU6cGFzc2NvZGUiLCJzY29wZSI6ImlibSBvcGVuaWQiLCJjbGllbnRfaWQiOiJieCIsImFjciI6MSwiYW1yIjpbInB3ZCJdfQ.aKshVIsF8zRgRpsL2OzVRak4B0ORVRGQcHezVq8-znB6sHHiKkPpPGWD8DdnAlrMBgZAdF3y_XAJ7gBXr6NOnX2Q-BNXe_JjN1o5VFlWWcmxouuAG9xz0SmaD1MFmL6xvX5_EaeAoJS11Yizgm1y0SFzzLGPy1EZFZtzekyTMuVEZI5jgR0FG8cgwPOdg5S9ERkShid5BMDlX1h-sq8c8PjPwiDxutFE5SzpglmBuYrnYW5K8T5NdgqQHd3qPMnAUJk9cTcIoMJhCRNLEQKXC74cSfWrP8lDyMd4wQUm2e9M3Dc3FsWkHdAaxrwKXYQMBtLl2YXESXmQDjiKZNudxw'\n",
    "target_dir_path='cos://eu-de/claimed-test/data.parquet/sql_results'\n",
    "sql='SELECT * FROM cos://eu-de/claimed-test/data.parquet stored as parquet'\n",
    "sql_query_crn='crn:v1:bluemix:public:sql-query:us-south:a/9b13b857a32341b7167255de717172f5:cd3e149d-e183-4162-a392-50bab6c64f47::'\n",
    "target_asset_name='target_asset_name'\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "55f9b39f-2c8c-4ab5-b4f5-513357bf20ba",
   "metadata": {},
   "outputs": [],
   "source": [
    "logger = logging.getLogger()\n",
    "logger.setLevel(logging.DEBUG)\n",
    "sh = logging.StreamHandler(sys.stdout)\n",
    "logger.addHandler(sh)\n",
    "\n",
    "for arg in sys.argv:\n",
    "    logger.debug(arg)\n",
    "    \n",
    "for en in os.environ:\n",
    "    logger.debug(en + ':' + os.environ.get(en))\n",
    "\n",
    "parameters = list(\n",
    "    map(lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
    "                sys.argv\n",
    "            )\n",
    "    )))\n",
    "\n",
    "exact_name = bool(exact_name)\n",
    "no_jobid_folder = bool(no_jobid_folder)\n",
    "number_of_objects = int(number_of_objects)\n",
    "rows_per_object = int(rows_per_object)\n",
    "\n",
    "\n",
    "for parameter in parameters:\n",
    "    logging.warning('Parameter: ' + parameter)\n",
    "    exec(parameter)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "welsh-grave",
   "metadata": {
    "papermill": {
     "duration": 4.178678,
     "end_time": "2021-03-22T20:29:30.176328",
     "exception": false,
     "start_time": "2021-03-22T20:29:25.997650",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "logging.debug(api_key)\n",
    "logging.debug(sql_query_crn)\n",
    "\n",
    "\n",
    "def exists(var):\n",
    "     return var in globals() and var is not None\n",
    "\n",
    "if not exists('api_key'):\n",
    "    sqlClient = SQLQuery(api_key=None, token=token, instance_crn=sql_query_crn)\n",
    "else:\n",
    "    logging.debug(api_key[1:5])\n",
    "    logging.debug(sql_query_crn)\n",
    "    sqlClient = SQLQuery(api_key=api_key, instance_crn=sql_query_crn)\n",
    "\n",
    "sql = sql + ' INTO {}'.format(target_dir_path)\n",
    "\n",
    "partitioned_by = False\n",
    "\n",
    "if partition_columns is not None and len(partition_columns) > 0:\n",
    "    if not partitioned_by:\n",
    "        sql = sql + ' PARTITIONED'\n",
    "        partitioned_by = True\n",
    "    sql = sql + ' BY ({})'.format(partition_columns)\n",
    "\n",
    "if number_of_objects is not None and number_of_objects > 0:\n",
    "    if not partitioned_by:\n",
    "        sql = sql + ' PARTITIONED'\n",
    "        partitioned_by = True\n",
    "    sql = sql + ' INTO {} OBJECTS'.format(number_of_objects)    \n",
    "\n",
    "if rows_per_object is not None and rows_per_object > 0:\n",
    "    if not partitioned_by:\n",
    "        sql = sql + ' PARTITIONED'\n",
    "        partitioned_by = True\n",
    "    sql = sql + ' EVERY {} ROWS'.format(rows_per_object)\n",
    "\n",
    "if no_jobid_folder:\n",
    "    sql = sql + ' JOBPREFIX NONE'\n",
    "\n",
    "sql = sql + ' STORED AS {}'.format(format)\n",
    "\n",
    "if exact_name:\n",
    "    job_id = sqlClient.submit_sql(sql)\n",
    "    job_status = sqlClient.wait_for_job(job_id)\n",
    "    print(\"Job \" + job_id + \" terminated with status: \" + job_status)\n",
    "    sqlClient.rename_exact_result(job_id) \n",
    "else:\n",
    "    sqlClient.run_sql(sql)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f7081e1f-3066-4a94-914c-cc59bfb4021b",
   "metadata": {},
   "outputs": [],
   "source": [
    "logging.info(sql)\n",
    "logging.info('done')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.6"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 470.538548,
   "end_time": "2021-03-22T20:37:13.369954",
   "environment_variables": {},
   "exception": null,
   "input_path": "/home/jovyan/work/examples/pipelines/pairs/component-library/transform/spark-csv-to-parquet.ipynb",
   "output_path": "/home/jovyan/work/examples/pipelines/pairs/component-library/transform/spark-csv-to-parquet.ipynb",
   "parameters": {},
   "start_time": "2021-03-22T20:29:22.831406",
   "version": "2.3.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
